# -*- coding: utf-8 -*-
# @Author  : Pony
# @Time    : 2021/7/7 下午2:22

import json
from kafka import KafkaConsumer

# 创建一个消费者，指定了topic,group_id,bootstrap_servers
# group_id:多个拥有相同group_id的消费者被判定为一组，一条数据记录只会被同一个组中的一个消费者消费
# bootstrap_servers：kafka的节点，多个节点使用逗号分隔
# 这种方式只会获取新产生的数据

consumer = KafkaConsumer("ks_mission_control",  # 订阅单个个主题
                         bootstrap_servers=['172.20.8.146:9092',],
                         value_deserializer=lambda m: json.loads(m.decode('utf8')),

                         # auto_offset_reset='earliest'   # 从最早的的消息开始消费 )
                         )

# consumer.subscribe(topics=['test', 'test0']) # 订阅多个主题

for msg in consumer:  # 迭代器，等待下一条消息
    # print(msg)  # 打印消息
    # print("%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value))
    print(len(msg.value['msgBody']['bins']))
    print(msg.value['msgBody']['bins'])
